{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Criando Grafos com Apache Graphx"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Criação de Grafos com a bilbioteca GraphX do Spark e GraphFrames\n",
    "![Alt Text](./img/graphx-logo.svg) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Carrega as bibliotecas de GraphFrames e loggin do scala usada pelo GrapgFrames\n",
    "import os\n",
    "jars    = [\"jars/graphframes-0.5.0-spark2.1-s_2.11.jar\", \"jars/scala-logging-api_2.11-2.1.2.jar\", \"jars/scala-logging-slf4j_2.11-2.1.2.jar\"]\n",
    "pyfiles = [\"pyfiles/graphframes.zip\"]\n",
    "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = \" --jars \"+ \",\".join(jars)  +\" --py-files \"+ \",\".join(pyfiles)  +\"  pyspark-shell\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import findspark\n",
    "import json, random\n",
    "\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "from IPython.core.display import HTML\n",
    "findspark.init()\n",
    "\n",
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import StringType, IntegerType\n",
    "from pyspark.sql.types import StructType,StructField,ArrayType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.context.SparkContext at 0x7f475978fa20>"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark = SparkSession\\\n",
    "    .builder\\\n",
    "    .appName(\"SparkGraphx\")\\\n",
    "    .getOrCreate()\n",
    "spark.sparkContext.setCheckpointDir(\"log\")\n",
    "spark.sparkContext"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Leitura dos dados de usuários (User.csv) e conexão entre os usuários (UserGraph.csv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+\n",
      "|  id|  NAME|\n",
      "+----+------+\n",
      "|1090|Jessie|\n",
      "|1159|Melvin|\n",
      "+----+------+\n",
      "only showing top 2 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "user_df = spark.read.format(\"csv\").option(\"header\", \"true\")\\\n",
    "                .load(\"dataset/User.csv\")\\\n",
    "                .withColumnRenamed(\"ID\", \"id\")\n",
    "        \n",
    "user_df.show(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----+\n",
      "| src| dst|\n",
      "+----+----+\n",
      "|1090|5309|\n",
      "|1090|3547|\n",
      "+----+----+\n",
      "only showing top 2 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "user_graph_df = spark.read.format(\"csv\").option(\"header\", \"true\")\\\n",
    "                .load(\"dataset/UserGraph.csv\")\\\n",
    "                .withColumnRenamed(\"USER_1\", \"src\")\\\n",
    "                .withColumnRenamed(\"USER_2\", \"dst\")\n",
    "            \n",
    "user_graph_df.show(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Criando Grafo a partir dos dados - Graph "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from graphframes import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total de Vertices (usuários): 6486\n",
      "Total de Arestas (Conexões): 336534\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "GraphFrame(v:[id: string, NAME: string], e:[src: string, dst: string])"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Carrega os dados para a Classe do GraphFrame\n",
    "#\n",
    "g = GraphFrame(user_df, user_graph_df)\n",
    "print(\"Total de Vertices (usuários): %d\" % g.vertices.count())\n",
    "print(\"Total de Arestas (Conexões): %d\" % g.edges.count())\n",
    "g.cache()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Vamos descobrir **qual é o usuário com maior quantidade de conexões**, ou seja, qual pessoa tem mais amigos nessa rede social. Para isso utilizamos a $inDegress$, que retorna a quantidade de conexões entre os vértices."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_most_connected(g, topN):\n",
    "    '''\n",
    "    Return a list containing the most friends\n",
    "    '''\n",
    "    g_indegrees = g.inDegrees\n",
    "    return g.vertices.join(g_indegrees, \"id\")\\\n",
    "            .orderBy(\"inDegree\", ascending=False).limit(topN)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+---------+--------+\n",
      "|  id|     NAME|inDegree|\n",
      "+----+---------+--------+\n",
      "| 859|   Hallie|    1933|\n",
      "|5306|     Arch|    1741|\n",
      "|2664|     Edna|    1528|\n",
      "|5716|   Dalton|    1426|\n",
      "|6306| Napoleon|    1394|\n",
      "|3805|Arlington|    1386|\n",
      "|2557|    Giles|    1371|\n",
      "|4898| Gottlieb|    1345|\n",
      "|5736|    Lemon|    1289|\n",
      "| 403|    Alvah|    1280|\n",
      "+----+---------+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "most_connected = get_most_connected(g, 10)\n",
    "most_connected.show(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Ou seja, o usuário \"Hallie\" tem um total de 1933 conexões.  Para exibir os usuários conectados a \"Hallie\", podemos usar a função $find$, que utiliza alguns modificadores para mapear a busca em Gráfos. Ex:\n",
    "$$\n",
    "(a)-[e]->(b)\n",
    "$$\n",
    "significa que, um vertice qualquer \"a\", deve estar ligado por uma aresta \"e\" a um vértice qualquer \"b\". Dessa forma é possível construir diveras Query diferentes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total de usuários conectados a 'Hallie':  1933\n",
      "+----+-------+\n",
      "|  id|   NAME|\n",
      "+----+-------+\n",
      "|1572| Finley|\n",
      "|2069|Patrick|\n",
      "|2904|   Linn|\n",
      "|3210|Preston|\n",
      "|3606|Colonel|\n",
      "+----+-------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "def get_users_connected(g, user_id):\n",
    "    '''\n",
    "    Return a list of connected users\n",
    "    '''\n",
    "    return g.find(\"(a)-[e]->(b)\")\\\n",
    "            .filter(\"b.id = %d\" % user_id)\\\n",
    "            .select(\"a.id\", \"a.NAME\")\n",
    "\n",
    "users = get_users_connected(g, 859)\n",
    "print(\"Total de usuários conectados a 'Hallie': \", users.count())\n",
    "users.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Podemos usar uma consulta semelhante pra encontrar amigos em comum entre dois usuários que não estão conecatados. O famoso \"pessoas que talvez você conheça\". Algo como:\n",
    "$$\n",
    "(a)-[e]->(b); (b)-[e2]->(c); !(a)-[]->(c)\n",
    "$$\n",
    "Ou seja, queremos um usuário \"a\" que tenha conexão com \"b\" [(a)-[e]->(b);], sendo que \"b\" tem conexão com \"c\", mas [(b)-[e2]->(c);], mas \"a\" e \"c\" não tenha conexão [!(c)-[]->(a)]."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 69,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total de possíveis amigos:  15181\n",
      "+----+-------+\n",
      "|  id|   name|\n",
      "+----+-------+\n",
      "|1289|  Abram|\n",
      "|1484|Wilburn|\n",
      "|4040|  Ralph|\n",
      "| 208|   Andy|\n",
      "|3750| Emmitt|\n",
      "+----+-------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "def get_friends_suggestion(g, user_id):\n",
    "    '''\n",
    "    Returns a list of suggested friendships \"people you may know\"\n",
    "    '''\n",
    "    users = g.find(\"(a)-[e]->(b); (b)-[e2]->(c); !(a)-[]->(c)\")\\\n",
    "                .filter(\"a.id = %d\" % user_id)\n",
    "    return users.select(\"c.id\", \"c.name\")\n",
    "\n",
    "users = get_friends_suggestion(g, 1572).cache()\n",
    "print(\"Total de possíveis amigos: \", users.count())\n",
    "users.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Subgraphs\n",
    "\n",
    "Os sub-grafos são uma amostra do grafo original, um subgrupo que pode ter interpretação própria. Vamos separa um sub-grafo dos usuários que tem conexão entre dois outros usuários. Ou seja, dado dois usuários, gostaria de saber todos os usuários que tem conexão."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_suggraphs_between_users(g, user1_id, user2_id):\n",
    "    '''\n",
    "    Return sugraphs conections between user1 and user2\n",
    "    '''\n",
    "    g_user  = g.find(\"(a)-[e]->(b); (c)-[e2]->(b)\")\\\n",
    "                .filter(\"a.id = {} and c.id = {}\".format(user1_id, user2_id))\n",
    "\n",
    "    g_vert  = g_user.select(\"a.id\", \"a.NAME\")\\\n",
    "                .unionAll(g_user.select(\"b.id\", \"b.NAME\"))\\\n",
    "                .unionAll(g_user.select(\"c.id\", \"c.NAME\")).distinct()\n",
    "\n",
    "    g_edges = g_user.select(\"e.src\", \"e.dst\")\\\n",
    "                .unionAll(g_user.select(\"e2.src\", \"e2.dst\")).distinct()\n",
    "\n",
    "    g2 = GraphFrame(g_vert, g_edges)\n",
    "\n",
    "    return g2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "user1_id = 4845 # Winnie\n",
    "user2_id = 1572 # Finley\n",
    "\n",
    "# SubGraph with Winnie connections between Finley\n",
    "g_users = get_suggraphs_between_users(g, user1_id, user2_id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+\n",
      "|  id|  NAME|\n",
      "+----+------+\n",
      "|  10|Edward|\n",
      "| 337|Josiah|\n",
      "|4845|Winnie|\n",
      "|1572|Finley|\n",
      "|2548|  Dora|\n",
      "+----+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "g_users.vertices.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lista com os usuáriso que conexão com $user1_id$ e $user2_id$, e o Sub-Graph dessa dessa conexão."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Alt Text](./img/graph.png) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Essa análise pode ser útil para identificar possíveis amigos em comum entre dois usuários."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Salvar GraphFrames em Json para visualização da biblioteca 3d\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "def graph2json(graph):\n",
    "    '''\n",
    "    Transform GraphFrames to Json File\n",
    "    '''\n",
    "    vertices = [{\"id\": v.id, \"name\": v.NAME, \"group\": 1} for v in graph.vertices.collect()]\n",
    "    edges    = [{\"source\": v.src, \"target\": v.dst, \"value\": 5} for v in graph.edges.collect()]\n",
    "\n",
    "    graph_json = {\"nodes\":vertices, \"links\": edges}\n",
    "    return graph_json\n",
    "\n",
    "def graph_to_file(graph, path):\n",
    "    '''\n",
    "    Save GraphFrames in Json File\n",
    "    '''\n",
    "    graph_json = graph2json(graph)\n",
    "    with open(path, 'w') as file:\n",
    "        json.dump(graph_json, file, ensure_ascii=False)\n",
    "    return path"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'graph.json'"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "graph_to_file(g_users, \"graph.json\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
